package com.ruyuan.eshop.promotion.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.concurrent.SafeThreadPool;
import com.ruyuan.eshop.common.constants.BatchSizeConstant;
import com.ruyuan.eshop.common.constants.RocketMqConstant;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.exception.BaseBizException;
import com.ruyuan.eshop.common.message.PlatformCouponMessage;
import com.ruyuan.eshop.common.message.PlatformMessagePushMessage;
import com.ruyuan.eshop.common.message.PlatformPromotionConditionUserBucketMessage;
import com.ruyuan.eshop.membership.domain.dto.MembershipAccountDTO;
import com.ruyuan.eshop.persona.api.PersonaApi;
import com.ruyuan.eshop.persona.domain.dto.PersonaFilterConditionDTO;
import com.ruyuan.eshop.persona.page.PersonaConditionPage;
import com.ruyuan.eshop.promotion.mq.producer.DefaultProducer;
import com.ruyuan.eshop.promotion.mq.splitter.ListSplitter;
import com.ruyuan.eshop.push.enums.InformTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;

import static com.ruyuan.eshop.common.constants.BatchSizeConstant.MESSAGE_BATCH_SIZE;

/**
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class PlatFormConditionCouponUserBucketListener implements MessageListenerConcurrently {

    /**
     * 账户服务
     */
    @DubboReference(version = "1.0.0")
    private PersonaApi personaApi;

    /**
     * 发送消息共用的线程池
     */
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;

    /**
     * rocketmq生产者
     */
    @Autowired
    private DefaultProducer defaultProducer;

    /**
     * 并发消费消息
     * @param msgList
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {

        try {
            for (MessageExt messageExt : msgList) {

                // 1、反序列化消息
                String messageString = new String(messageExt.getBody());
                log.debug("部分用户领取优惠券用户桶消息逻辑，消息内容：{}", messageString);
                PlatformPromotionConditionUserBucketMessage message = JSON.parseObject(messageString,
                        PlatformPromotionConditionUserBucketMessage.class);

                // 2、查询桶内的用户信息
                Integer shardId = message.getShardId();
                // 根据分片id，和分片数量大小，计算出本次分片的起始userid
                Long startUserId = (shardId.longValue() - 1) * 1000;
                Integer bucketSize = message.getBucketSize();
                String personaFilterCondition = message.getPersonaFilterCondition();
                PersonaFilterConditionDTO personaFilterConditionDTO = JSON.parseObject(
                        personaFilterCondition,
                        PersonaFilterConditionDTO.class);
                // 封装查询用户id的条件
                PersonaConditionPage page = PersonaConditionPage.builder()
                        .memberPoint(personaFilterConditionDTO.getMemberPoint())
                        .memberLevel(personaFilterConditionDTO.getMemberLevel())
                        .offset(startUserId)
                        .limit(bucketSize)
                        .build();
                // 从用户画像系统查询用户账号id
                JsonResult<List<Long>> accountIdsResult = personaApi.getAccountIdsByIdLimit(page);
                if (!accountIdsResult.getSuccess()) {
                    throw new BaseBizException(accountIdsResult.getErrorCode(), accountIdsResult.getErrorMessage());
                }
                List<Long> accountIds = accountIdsResult.getData();
                if (CollectionUtils.isEmpty(accountIds)) {
                    log.info("根据用户桶内的分片信息没有查询到用户, shardId={}", shardId);
                    continue;
                }

                // 3、每个用户发送一条领取优惠券的消息通知
                PlatformMessagePushMessage pushMessage = PlatformMessagePushMessage.builder()
                        .message("恭喜您获得优惠券领取资格，点击www.ruyuan2020.com进入活动页面")
                        .mainMessage("获得优惠券领取资格")
                        .informType(InformTypeEnum.APP.getCode())
                        .build();

                List<String> messages = new ArrayList<>();
                for (Long accountId : accountIds) {
                    pushMessage.setUserAccountId(accountId);
                    messages.add(JSON.toJSONString(pushMessage));
                }
                log.info("本次推送消息数量,{}",messages.size());
                ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
                while (splitter.hasNext()){
                    List<String> sendBatch = splitter.next();
                    sharedSendMsgThreadPool.execute(() -> {
                        defaultProducer.sendMessages(RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_TOPIC, sendBatch, "平台发送优惠券消息");
                    });
                }
            }
        } catch (Exception e){
            log.error("consume error,消费失败", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}
